package com.bawei.persona.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 */
public class RichClickhouseFunction  extends RichSinkFunction<JSONObject> {
    Connection connection = null;
    PreparedStatement  ps = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        String driver = "ru.yandex.clickhouse.ClickHouseDriver" ;
        String  url = "jdbc:clickhouse://hadoop102:8123/default" ;
        String  username = "default" ;
        String  password = "" ;
        try {
            Class.forName(driver);
            connection = DriverManager.getConnection(url, username, password);
            ps = connection.prepareStatement(" insert into product_stats_flink values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
        }catch (Exception e ){

        }
    }

    @Override
    public void close() throws Exception {

        try {

            if(ps!=null){
                ps.close();
            }
            if(connection!=null){
                connection.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        //特别注意，在调试过程钟发现时间长度与hbaee clickhouse 不一样，窃取保持相同
        ps.setString(1, value.getString("stt").substring(0,19)  );
        ps.setString(2, value.getString("edt").substring(0,19) );
        ps.setLong(3, value.getLong("sku_id")== null ?0L:value.getLong("sku_id") );
        ps.setString(4, value.getString("sku_name"));
        ps.setBigDecimal(5, value.getBigDecimal("sku_price")==null? new BigDecimal(0.0) :value.getBigDecimal("sku_price"));
        ps.setLong(6, value.getLong("spu_id")== null ?0L:value.getLong("spu_id"));
        ps.setString(7, value.getString("spu_name"));
        ps.setLong(8, value.getLong("tm_id")== null ?0L:value.getLong("tm_id"));
        ps.setString(9, value.getString("tm_name"));
        ps.setLong(10, value.getLong("category3_id")== null ?0L:value.getLong("category3_id"));
        ps.setString(11, value.getString("category3_name"));
        ps.setLong(12, value.getLong("display_ct")== null ?0L:value.getLong("display_ct"));
        ps.setLong(13, value.getLong("click_ct")== null ?0L:value.getLong("click_ct"));
        ps.setLong(14, value.getLong("favor_ct")== null ?0L:value.getLong("favor_ct"));
        ps.setLong(15, value.getLong("cart_ct")== null ?0L:value.getLong("cart_ct"));
        ps.setLong(16, value.getLong("order_sku_num")== null ?0L:value.getLong("order_sku_num"));
        ps.setBigDecimal(17, value.getBigDecimal("order_amount")==null? new BigDecimal(0.0) :value.getBigDecimal("order_amount") );
        ps.setLong(18, value.getLong("order_ct")== null ?0L:value.getLong("order_ct"));
        ps.setBigDecimal(19, value.getBigDecimal("payment_amount")==null? new BigDecimal(0.0) :value.getBigDecimal("payment_amount"));
        ps.setLong(20, value.getLong("paid_order_ct")== null ?0L:value.getLong("paid_order_ct"));
        ps.setLong(21, value.getLong("refund_order_ct")== null ?0L:value.getLong("refund_order_ct"));
        ps.setBigDecimal(22, value.getBigDecimal("refund_amount")==null? new BigDecimal(0.0) :value.getBigDecimal("refund_amount"));
        ps.setLong(23, value.getLong("comment_ct")== null ?0L:value.getLong("comment_ct"));
        ps.setLong(24, value.getLong("good_comment_ct")== null ?0L:value.getLong("good_comment_ct"));
        //时间在转化过程中以原始的时间处理时间为准
        ps.setLong(25, value.getLong("ts")== null ?0L:value.getLong("ts") );
        ps.executeUpdate();

    }
}
